package cn.tenmg.flink.jobs.launcher;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import cn.tenmg.flink.jobs.FlinkJobsLauncher.FlinkJobsInfo.State;
import cn.tenmg.flink.jobs.config.model.FlinkJobs;
import cn.tenmg.flink.jobs.launcher.context.FlinkJobsLauncherContext;
import cn.tenmg.flink.jobs.launcher.utils.HttpClientUtils;
import cn.tenmg.flink.jobs.launcher.utils.Sets;

/**
 * 基于命令行的flink-jobs应用程序启动器
 * 
 * @author June wjzhao@aliyun.com
 * 
 * @since 1.0.0
 */
public class CommandLineFlinkJobsLauncher extends AbstractFlinkJobsLauncher {

	private static final Logger log = LoggerFactory.getLogger(CommandLineFlinkJobsLauncher.class);

	private static final boolean isWindows = System.getProperty("os.name", "").toLowerCase().contains("windows");

	private static final char BLANK_SPACE = ' ', EQUAL = '=';

	private static final String LINE_SEPARATOR = System.lineSeparator();

	private static final String APPLICATION_ID_PREFFIX = "Submitting application master",
			JOB_ID_PREFIX = "Job has been submitted with JobID ", JOB_FINISHED_PREFIX = "Job with JobID ",
			JOB_FINISHED_SUFFIX = " has finished", CURRENT_STATE_PREFFIX = "Deploying cluster, current state",
			RUNNING_LOG = "Starting execution of program",
			YARN_RUNNING_LOG = "YARN application has been deployed successfully",
			FINISHED_LOG = "Program execution finished", PROPERTIES_KEY_PREFIXX = "-D",
			YARN_APPLICATION_NAME_KEY = PROPERTIES_KEY_PREFIXX + "yarn.application.name",
			YARN_FINISHED_LOG = "YARN application has been finished successfully", EXCEPTION = "Exception",
			SAVEPOINT_PATH_PREFIX = "Savepoint completed. Path: ";

	private static final Pattern JOB_FINISHED_PATTERN = Pattern
			.compile(JOB_FINISHED_PREFIX + "[\\S]+" + JOB_FINISHED_SUFFIX);

	private static int JOB_ID_PREFIX_LEN = JOB_ID_PREFIX.length(),
			APPLICATION_ID_PREFFIX_LEN = APPLICATION_ID_PREFFIX.length(),
			JOB_FINISHED_PREFIX_LEN = JOB_FINISHED_PREFIX.length(),
			JOB_FINISHED_SUFFIX_LEN = JOB_FINISHED_SUFFIX.length(),
			CURRENT_STATE_PREFFIX_LEN = CURRENT_STATE_PREFFIX.length(),
			SAVEPOINT_PATH_PREFIX_LEN = SAVEPOINT_PATH_PREFIX.length();

	private static final Set<String> TARGET = Sets.as("-t", "--target"),
			FROM_SAVEPOINT = Sets.as("-s", "--fromSavepoint");

	private String flinkHome = FlinkJobsLauncherContext.getProperty("commandline.flink.home");

	private Action action = Action.fromString(FlinkJobsLauncherContext.getProperty("commandline.launch.action", "run"));

	private String tempFilePrefix = FlinkJobsLauncherContext.getProperty("commandline.launch.temp_file_prefix",
			"flink-jobs_");

	private String yarnRest = FlinkJobsLauncherContext.getProperty("commandline.yarn.rest");

	private int yarnApplicationCheckAttempts = Integer
			.parseInt(FlinkJobsLauncherContext.getProperty("commandline.yarn.application_check_attempts", "60"));

	private int timeMillisBetweenYarnApplicationCheckAttempts = Integer.parseInt(FlinkJobsLauncherContext
			.getProperty("commandline.yarn.time_millis_between_application_check_attempts", "3000"));

	private String yarnApplicationIdPrefix = FlinkJobsLauncherContext
			.getProperty("commandline.yarn.application_id_prefix", "application_");

	public String getFlinkHome() {
		return flinkHome;
	}

	public void setFlinkHome(String flinkHome) {
		this.flinkHome = flinkHome;
	}

	public Action getAction() {
		return action;
	}

	public void setAction(Action action) {
		this.action = action;
	}

	public String getYarnRest() {
		return yarnRest;
	}

	public void setYarnRest(String yarnRest) {
		this.yarnRest = yarnRest;
	}

	public String getTempFilePrefix() {
		return tempFilePrefix;
	}

	public void setTempFilePrefix(String tempFilePrefix) {
		this.tempFilePrefix = tempFilePrefix;
	}

	/**
	 * 请使用getYarnApplicationCheckAttempts替换
	 * 
	 * @return yarnApplicationCheckAttempts
	 */
	@Deprecated
	public int getYarnRestAttempts() {
		return yarnApplicationCheckAttempts;
	}

	/**
	 * 请使用setYarnApplicationCheckAttempts替换
	 * 
	 * @param yarnRestAttempts
	 *            yarnApplicationCheckAttempts
	 */
	@Deprecated
	public void setYarnRestAttempts(int yarnRestAttempts) {
		this.yarnApplicationCheckAttempts = yarnRestAttempts;
	}

	public int getYarnApplicationCheckAttempts() {
		return yarnApplicationCheckAttempts;
	}

	public void setYarnApplicationCheckAttempts(int yarnApplicationCheckAttempts) {
		this.yarnApplicationCheckAttempts = yarnApplicationCheckAttempts;
	}

	public int getTimeMillisBetweenYarnApplicationCheckAttempts() {
		return timeMillisBetweenYarnApplicationCheckAttempts;
	}

	public void setTimeMillisBetweenYarnApplicationCheckAttempts(int timeMillisBetweenYarnApplicationCheckAttempts) {
		this.timeMillisBetweenYarnApplicationCheckAttempts = timeMillisBetweenYarnApplicationCheckAttempts;
	}

	public String getYarnApplicationIdPrefix() {
		return yarnApplicationIdPrefix;
	}

	public void setYarnApplicationIdPrefix(String yarnApplicationIdPrefix) {
		this.yarnApplicationIdPrefix = yarnApplicationIdPrefix;
	}

	@Override
	public FlinkJobsInfo launch(FlinkJobs flinkJobs) throws Exception {
		String jar = getJar(flinkJobs);
		int jarIndex = validateJar(jar);
		String defaultAppName = flinkJobs.getServiceName();
		if (defaultAppName == null) {
			int begin = jar.lastIndexOf('/');
			if (begin < 0) {
				begin = jar.lastIndexOf('\\');
			}
			defaultAppName = jar.substring(begin + 1, jarIndex);
		}

		StringBuilder commandBuilder = new StringBuilder();
		String dirPath = File.separator;
		if (flinkHome == null) {
			commandBuilder.append("flink");
		} else {
			dirPath += "bin";
			commandBuilder.append(flinkHome + File.separator + "bin" + File.separator + "flink");
		}
		commandBuilder.append(BLANK_SPACE).append(action.getName());
		String appName = attachOptions(commandBuilder, defaultAppName, flinkJobs.getOptions(),
				flinkJobs.isAllwaysNewJob());
		String entryPointClassName = getEntryPointClassName(flinkJobs);
		if (entryPointClassName != null) {
			commandBuilder.append(BLANK_SPACE).append("-c").append(BLANK_SPACE).append(entryPointClassName);
		}
		commandBuilder.append(BLANK_SPACE).append(jar);

		String arguments = getArguments(flinkJobs);
		if (!isEmptyArguments(arguments)) {
			/*
			 * File file = File.createTempFile(tempFilePrefix, ".json");
			 * file.setWritable(true); FileWriter writer = null; try { writer = new
			 * FileWriter(file); writer.write(argument); } catch (Exception e) { throw e; }
			 * finally { if (writer != null) { writer.close(); } }
			 * commandBuilder.append(BLANK_SPACE).append(file.getAbsolutePath());
			 */
			commandBuilder.append(BLANK_SPACE).append(encode(arguments));
		}

		String command = commandBuilder.toString();
		log.info("Execute command: " + command);

		Process p = null;
		InputStream is = null;
		BufferedReader r = null;
		File temp = null;
		FlinkJobsInfo appInfo = new FlinkJobsInfo();
		try {
			temp = File.createTempFile(tempFilePrefix, isWindows ? ".bat" : ".sh");
			temp.setExecutable(true);
			FileWriter w = null;
			String[] cmdarray;
			try {
				w = new FileWriter(temp);
				if (isWindows) {// Windows命令
					cmdarray = new String[] { "cmd", "/C", temp.getAbsolutePath() };
				} else {// Unix命令
					File profile = new File("/etc/profile");
					if (profile.exists()) {
						FileReader fr = null;
						try {
							fr = new FileReader(profile);
							int ch = 0;
							while ((ch = fr.read()) != -1) {
								w.write(ch);
							}
						} catch (IOException e) {
							e.printStackTrace();
						} finally {
							if (fr != null) {
								fr.close();
							}
						}
					}
					w.write(LINE_SEPARATOR);
					cmdarray = new String[] { "/bin/sh", "-c", temp.getAbsolutePath() };
				}
				w.write(command);
			} catch (Exception e) {
				throw e;
			} finally {
				if (w != null) {
					w.close();
				}
			}
			p = Runtime.getRuntime().exec(cmdarray, null, new File(dirPath));
			(new InputStreamCatcher(p.getInputStream(), appInfo)).start();
			(new ErrorStreamCatcher(p.getErrorStream(), appInfo)).start();
			p.waitFor();
			State state = null;
			String jobsId = appInfo.getJobId();
			if (jobsId == null && appName != null) {
				appName = appName.trim();
				if (yarnRest == null) {
					log.warn("Please set the yarnRest for the launcher to get applicationId");
				} else {
					int attemptsLeft = yarnApplicationCheckAttempts;
					String jobId = null;
					long millisBegin, millisLeft;
					while (jobId == null && attemptsLeft-- > 0) {
						String result = HttpClientUtils.get(yarnRest + "/apps");
						if (result == null) {
							Thread.sleep(timeMillisBetweenYarnApplicationCheckAttempts);
						} else {
							millisBegin = System.currentTimeMillis();
							JSONObject json = JSON.parseObject(result);
							if (json.containsKey("apps")) {
								JSONObject apps = json.getJSONObject("apps");
								if (apps.containsKey("app")) {
									JSONArray appArr = apps.getJSONArray("app");
									for (int i = 0, size = appArr.size(); i < size; i++) {
										JSONObject app = appArr.getJSONObject(i);
										if (appName.equals(app.getString("name"))) {
											jobId = app.getString("id");
											state = State.valueOf(app.getString("state"));
											break;
										}
									}
								}
							}
							millisLeft = System.currentTimeMillis() - millisBegin;
							if (millisLeft > 0) {
								Thread.sleep(millisLeft);
							}
						}
					}
					appInfo.setJobId(jobId);
				}
			}
			if (state != null) {
				appInfo.setState(state);
			}
			return appInfo;
		} catch (Exception e) {
			String message = e.getMessage();
			if (message == null) {
				throw e;
			} else {
				appInfo.setState(State.FAILED);
				appInfo.setMessage(message);
			}
		} finally {
			if (is != null) {
				is.close();
			}
			if (r != null) {
				r.close();
			}
			if (p != null) {
				p.destroy();
			}
			if (temp != null) {// 终止后删除临时文件
				temp.deleteOnExit();
			}
		}
		return appInfo;
	}

	@Override
	public String stop(String jobId) throws Exception {
		if (jobId == null) {
			throw new IllegalArgumentException("jobId must be not null");
		}
		if (jobId.startsWith(yarnApplicationIdPrefix)) {
			if (yarnRest == null) {
				throw new IllegalArgumentException("Job with jobId starting with prefix '" + yarnApplicationIdPrefix
						+ "' are considered as a yarn application, yarnRest must be set when using yarn");
			}
			HttpClientUtils.put(yarnRest + (yarnRest.endsWith("/") ? "" : "/") + "apps/" + jobId + "/state",
					"{\"state\":\"KILLED\"}");
			return null;
		} else {
			StringBuilder commandBuilder = new StringBuilder();
			String dirPath = File.separator;
			if (flinkHome == null) {
				commandBuilder.append("flink");
			} else {
				dirPath += "bin";
				commandBuilder.append(flinkHome + File.separator + "bin" + File.separator + "flink");
			}
			commandBuilder.append(BLANK_SPACE).append("stop").append(BLANK_SPACE).append(jobId);
			String command = commandBuilder.toString(), cmdarray[];
			if (isWindows) {// Windows命令
				cmdarray = new String[] { "cmd", "/C", command };
			} else {// Unix命令
				cmdarray = new String[] { "/bin/sh", "-c", command };
			}
			log.info("Execute command: " + command);
			Process p = null;
			try {
				p = Runtime.getRuntime().exec(cmdarray, null, new File(dirPath));
				SavepointPathCatcher savepointPathCatcher = new SavepointPathCatcher(p.getInputStream());
				savepointPathCatcher.start();
				p.waitFor();
				return savepointPathCatcher.getSavepointPath();
			} catch (Exception e) {
				throw e;
			} finally {
				if (p != null) {
					p.destroy();
				}
			}
		}
	}

	protected String attachOptions(StringBuilder commandBuilder, String defaultAppName, Map<String, String> options,
			boolean allwaysNewJob) {
		String key, value, targetKey = null, target = null, appName = null;
		if (options != null) {
			for (Iterator<Entry<String, String>> it = options.entrySet().iterator(); it.hasNext();) {
				Entry<String, String> entry = it.next();
				key = entry.getKey();
				if (!key.startsWith("-")) {
					key = "--" + key;
				}
				value = entry.getValue();
				if (TARGET.contains(key)) {
					if (value != null && !value.isEmpty()) {
						targetKey = key;
						target = value;
					}
				} else if (FROM_SAVEPOINT.contains(key)) {
					if (allwaysNewJob) {
						log.info("Submit a new job, discard the option '" + key + "'");
					} else {
						commandBuilder.append(BLANK_SPACE).append(key).append(EQUAL).append(value);
					}
				} else {
					commandBuilder.append(BLANK_SPACE).append(key);
					if (key.startsWith(PROPERTIES_KEY_PREFIXX)) {
						if (YARN_APPLICATION_NAME_KEY.equals(key)) {
							if (value == null) {
								appName = generateApplicationName(defaultAppName);
							} else {
								appName = generateApplicationName(value);
							}
							commandBuilder.append(EQUAL).append(appName);
						} else if (value != null && !value.isEmpty()) {
							commandBuilder.append(EQUAL).append(value);
						}
					} else if (value != null && !value.isEmpty()) {
						commandBuilder.append(BLANK_SPACE).append(value);
					}
				}
			}
		}
		if (target == null) {
			if (Action.RUN_APPLICATION.equals(action)) {
				commandBuilder.append(BLANK_SPACE).append("-t").append(BLANK_SPACE).append("yarn-application");
				if (appName == null) {
					appName = generateApplicationName(defaultAppName);
					commandBuilder.append(BLANK_SPACE).append(YARN_APPLICATION_NAME_KEY).append(EQUAL).append(appName);
				}
			}
		} else {
			commandBuilder.append(BLANK_SPACE).append(targetKey).append(BLANK_SPACE).append(target);
			if (target.startsWith("yarn") && appName == null) {
				appName = generateApplicationName(defaultAppName);
				commandBuilder.append(BLANK_SPACE).append(YARN_APPLICATION_NAME_KEY).append(EQUAL).append(appName);
			}
		}
		return appName;
	}

	private static String encode(String original) {
		return '"' + original.replaceAll("\"", "\\\\\"").replaceAll("`", "\\\\`") + '"';
	}

	private String generateApplicationName(String serviceName) {
		return yarnApplicationIdPrefix + serviceName + "_" + System.currentTimeMillis();
	}

	public static enum Action {
		RUN("run"), RUN_APPLICATION("run-application");

		private static Map<String, Action> actions = new HashMap<String, Action>() {
			/**
			 * 
			 */
			private static final long serialVersionUID = -1813352166334425568L;

			{
				put(RUN.name, RUN);
				put(RUN_APPLICATION.name, RUN_APPLICATION);
			}
		};

		private String name;

		private Action(String name) {
			this.name = name;
		}

		public String getName() {
			return name;
		}

		public static Action fromString(String action) {
			if (actions.containsKey(action)) {
				return actions.get(action);
			}
			return Action.valueOf(action);
		}

	}

	public static class InputStreamCatcher extends Thread {

		private final InputStream is;

		private final FlinkJobsInfo appInfo;

		public InputStreamCatcher(InputStream is, FlinkJobsInfo appInfo) {
			this.is = is;
			this.appInfo = appInfo;
		}

		@Override
		public void run() {
			BufferedReader r = null;
			FlinkJobsInfo.State state = FlinkJobsInfo.State.SUBMITTED;
			String line, jobId = null;
			try {
				r = new BufferedReader(new InputStreamReader(is));
				while ((line = r.readLine()) != null) {// 收集日志信息，并记录applicationId和state
					System.out.println(line);
					int index = line.indexOf(JOB_ID_PREFIX);
					if (index >= 0) {
						jobId = line.substring(JOB_ID_PREFIX_LEN).trim();
					} else {
						index = line.indexOf(APPLICATION_ID_PREFFIX);
						if (index >= 0) {
							jobId = line.substring(index + APPLICATION_ID_PREFFIX_LEN).trim();
						} else {
							index = line.indexOf(CURRENT_STATE_PREFFIX);
							if (index >= 0) {
								try {
									state = FlinkJobsInfo.State
											.valueOf(line.substring(index + CURRENT_STATE_PREFFIX_LEN).trim());
								} catch (Exception e) {
									state = FlinkJobsInfo.State.SUBMITTED;
								}
								break;
							} else if (line.contains(RUNNING_LOG) || line.contains(YARN_RUNNING_LOG)) {
								state = FlinkJobsInfo.State.RUNNING;
								break;
							} else if (line.contains(FINISHED_LOG) || line.contains(YARN_FINISHED_LOG)) {
								state = FlinkJobsInfo.State.FINISHED;
								break;
							} else {
								Matcher m = JOB_FINISHED_PATTERN.matcher(line);
								if (m.find()) {
									String group = m.group();
									jobId = group.substring(JOB_FINISHED_PREFIX_LEN,
											group.length() - JOB_FINISHED_SUFFIX_LEN);
								} else if (FlinkJobsInfo.State.SUBMITTED.equals(state)) {
									if (line.contains(EXCEPTION)) {
										state = FlinkJobsInfo.State.FAILED;
										StringBuffer message = new StringBuffer();
										message.append(line).append(LINE_SEPARATOR);
										while ((line = r.readLine()) != null) {
											System.out.println(line);
											message.append(line).append(LINE_SEPARATOR);
										}
										appInfo.setMessage(message.toString());
										break;
									}
								}
							}
						}
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				if (r != null) {
					try {
						r.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
				}
			}
			appInfo.setJobId(jobId);
			FlinkJobsInfo.State status = appInfo.getState();
			if (status == null) {
				synchronized (appInfo) {
					status = appInfo.getState();
					if (status == null) {
						appInfo.setState(state);
					}
				}
			}
		}
	}

	public static class ErrorStreamCatcher extends Thread {

		private final InputStream is;

		private final FlinkJobsInfo appInfo;

		public ErrorStreamCatcher(InputStream is, FlinkJobsInfo appInfo) {
			this.is = is;
			this.appInfo = appInfo;
		}

		@Override
		public void run() {
			BufferedReader r = null;
			try {
				r = new BufferedReader(new InputStreamReader(is));
				String line;
				while ((line = r.readLine()) != null) {
					if (line.contains(EXCEPTION)) {
						appInfo.setState(FlinkJobsInfo.State.FAILED);
						StringBuffer message = new StringBuffer();
						message.append(line).append(LINE_SEPARATOR);
						while ((line = r.readLine()) != null) {
							System.out.println(line);
							message.append(line).append(LINE_SEPARATOR);
						}
						appInfo.setMessage(message.toString());
						return;
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				if (r != null) {
					try {
						r.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
				}
			}
		}

	}

	public static class SavepointPathCatcher extends Thread {

		private final InputStream is;

		private String savepointPath;

		public String getSavepointPath() {
			return savepointPath;
		}

		public SavepointPathCatcher(InputStream is) {
			super();
			this.is = is;
		}

		@Override
		public void run() {
			BufferedReader r = null;
			String line;
			try {
				r = new BufferedReader(new InputStreamReader(is));
				int index;
				while ((line = r.readLine()) != null) {// 收集日志信息，并记录savepointPath
					System.out.println(line);
					index = line.indexOf(SAVEPOINT_PATH_PREFIX);
					if (index >= 0) {
						savepointPath = line.substring(index + SAVEPOINT_PATH_PREFIX_LEN).trim();
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				if (r != null) {
					try {
						r.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
				}
			}
		}
	}

}
